import os
import sys
sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + './.' + '../..'))

import numpy as np
from numpy.testing import assert_allclose
import pandas as pd
import statsmodels
import statsmodels.api as sm
import statsmodels.formula.api as smf

from warnings import simplefilter
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

from python.numerical.functions import (
    get_kalman_gain,
    simulate_target_variable,
    generate_forecasts,
    simulate_dataset,
    get_autocovar_MA,
    get_MA_coefs_sum,
    get_IRF_ARMA22,
    calculate_shock_specific_bias_coefs,
    calculate_composite_bias_coefs,
    calculate_bias_coefs,
    calculate_composite_bias_coefs_noisy,
    fit_model
)


def test_kalman_gain():
    np.random.seed(20182018)

    # Check whether the theoretical value of
    # the Kalman gain matches the one calculated
    # using a standard implementation of the Kalman
    # filter (statsmodels)

    # Parameters of DGP
    rho = 0.8
    sigma_eps = 1.5
    sigma_omega = 3.0

    df = simulate_target_variable(rho = rho, 
                       sigma_eps = sigma_eps, 
                       sigma_omega = sigma_omega, 
                       T = 1000 + 5000)

    # Run Kalman filter using statsmodels

    # Construct matrices for canonical form
    # of the state-space model
    Z = np.array([[1]])
    H = np.array([[sigma_omega ** 2]])
    T = np.array([[rho]])
    R = np.array([[1]])
    Q = np.array([[sigma_eps ** 2]])

    # Create statsmodels class for Kalman filtering
    class KW_model(sm.tsa.statespace.MLEModel):
        def __init__(self, endog):
            # Initialize the state space model
            super(KW_model, self).__init__(endog, 
                                           k_states = 1,
                                           initialization='stationary')

            # Setup the fixed components of the state space representation
            self['design'] = Z
            self['transition'] = T
            self['selection'] = R
            self['obs_cov'] = H
            self['state_cov'] = Q

    # Run Kalman filter
    mod = KW_model(df['s'].values)
    res_KF = mod.filter(params = [])
    df['state_KF'] = res_KF.filtered_state[0,:]
    df['forecast_KF'] = rho * df['state_KF']
    df['forecast_KF_lag'] = df['forecast_KF'].shift(1)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Run regression to back out implied Kalman gain
    results = smf.ols('forecast_KF ~ forecast_KF_lag + s', data = df).fit()
    G_implied = results.params['s'] / rho
    G_theoretical = get_kalman_gain(rho, sigma_eps, sigma_omega)
    assert_allclose(G_implied, G_theoretical, 0, atol = 1e-8)


def test_kalman_gain_2():
    np.random.seed(20182018)

    # Check whether the theoretical value of
    # the Kalman gain matches the one calculated
    # using a standard implementation of the Kalman
    # filter (statsmodels)

    # This test uses a different implied restriction

    # Parameters of DGP
    rho = 0.8
    sigma_eps = 1.5
    sigma_omega = 3.0

    df = simulate_target_variable(rho = rho, 
                       sigma_eps = sigma_eps, 
                       sigma_omega = sigma_omega, 
                       T = 1000 + 5000)

    # Run Kalman filter using statsmodels

    # Construct matrices for canonical form
    # of the state-space model
    Z = np.array([[1]])
    H = np.array([[sigma_omega ** 2]])
    T = np.array([[rho]])
    R = np.array([[1]])
    Q = np.array([[sigma_eps ** 2]])

    # Create statsmodels class for Kalman filtering
    class KW_model(sm.tsa.statespace.MLEModel):
        def __init__(self, endog):
            # Initialize the state space model
            super(KW_model, self).__init__(endog, 
                                           k_states = 1,
                                           initialization='stationary')

            # Setup the fixed components of the state space representation
            self['design'] = Z
            self['transition'] = T
            self['selection'] = R
            self['obs_cov'] = H
            self['state_cov'] = Q

    # Run Kalman filter
    mod = KW_model(df['s'].values)
    res_KF = mod.filter(params = [])
    df['state_KF'] = res_KF.filtered_state[0,:]
    df['forecast_KF'] = rho * df['state_KF']
    df['forecast_KF_lag'] = df['forecast_KF'].shift(1)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Run regression to back out implied Kalman gain
    results = smf.ols('forecast_KF ~ forecast_KF_lag + s', data = df).fit()
    G_implied = 1 - results.params['forecast_KF_lag'] / rho
    G_theoretical = get_kalman_gain(rho, sigma_eps, sigma_omega)
    assert_allclose(G_implied, G_theoretical, 0, atol = 1e-8)


def test_non_diagnostic_component():
    # Check that the non-diagnostic component
    # of expectations matches forecasts generated
    # by the Kalman filter numerically

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.0,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.90,
                           'sigma_eps_perceived': 1.5,
                           'sigma_omega_perceived': 0.25,
                           'theta': 0.50}

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)

    # Run Kalman filter using statsmodels

    # Construct matrices for canonical form
    # of the state-space model
    Z = np.array([[1]])
    H = np.array([[params_expectations['sigma_omega_perceived'] ** 2]])
    T = np.array([[params_expectations['rho_perceived']]])
    R = np.array([[1]])
    Q = np.array([[params_expectations['sigma_eps_perceived'] ** 2]])

    # Create statsmodels class for Kalman filtering
    class KW_model(sm.tsa.statespace.MLEModel):
        def __init__(self, endog):
            # Initialize the state space model
            super(KW_model, self).__init__(endog, 
                                           k_states = 1,
                                           initialization='stationary')

            # Setup the fixed components of the state space representation
            self['design'] = Z
            self['transition'] = T
            self['selection'] = R
            self['obs_cov'] = H
            self['state_cov'] = Q

    # Run Kalman filter
    mod = KW_model(df['s'].values)
    res_KF = mod.filter(params = [])
    df['state_KF'] = res_KF.filtered_state[0,:]
    df['forecast_KF'] = params_expectations['rho_perceived'] * df['state_KF']
    df['forecast_KF'] = df['forecast_KF'].shift(1) # Align with date for which forecasted

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]
    assert_allclose(df['forecast_KF'].values, df['forecast_non_diag'].values)


def test_diagnostic_component():
    # Check that diagnostic expectations
    # are correctly calculated in the special
    # case with no information friction (sigma_omega = 0)
    # and all parameter values are correctly
    # perceived

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-10,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.50}

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Check whether forecast errors for the
    # model follow the prediction
    test_1 =  (df['forecast_error'] - df['eps']) / (- params_expectations['theta'] * params_DGP['rho'])
    test_2 = df['eps'].shift(1)
    assert_allclose(test_1.values[1:], test_2.values[1:], atol = 1e-8)


def test_diagnostic_component_2():
    # Check that diagnostic expectations
    # are correctly calculated in the special
    # case with an information friction
    # when all parameter values are correctly
    # perceived

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.5,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.50}

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)
    df['forecast_error_lag'] = df['forecast_error'].shift(1)
    df['eps_lag'] = df['eps'].shift(1)
    df['omega_lag1'] = df['omega'].shift(1)
    df['omega_lag2'] = df['omega'].shift(2)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Check whether forecast errors for the
    # model follow the prediction
    G = get_kalman_gain(params_expectations['rho_perceived'], 
        params_expectations['sigma_eps_perceived'], 
        params_expectations['sigma_omega_perceived'])
    test_1 = df['forecast_error']- params_DGP['rho'] * (1 - G) * df['forecast_error_lag']
    test_2 = (df['eps'] - params_DGP['rho'] * params_expectations['theta'] * G * df['eps_lag'] 
        - params_DGP['rho'] * G * (1 + params_expectations['theta']) * (df['omega_lag1']- params_DGP['rho'] * params_expectations['theta'] / (1 + params_expectations['theta']) * df['omega_lag2']))
    assert_allclose(test_1.values[1:], test_2.values[1:], atol = 1e-8)


def test_full_expectations():
    # Check that full expectations
    # are correctly calculated using the 
    # ARMA(2, 2) representation

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.5,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.65,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': 5.0,
                           'theta': 0.50}

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)
    df['forecast_error_lag1'] = df['forecast_error'].shift(1)
    df['forecast_error_lag2'] = df['forecast_error'].shift(2)
    df['eps_lag1'] = df['eps'].shift(1)
    df['eps_lag2'] = df['eps'].shift(2)
    df['omega_lag1'] = df['omega'].shift(1)
    df['omega_lag2'] = df['omega'].shift(2)
    df['omega_lag3'] = df['omega'].shift(3)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Unpack parameters
    rho = params_DGP['rho']
    sigma_eps = params_DGP['sigma_eps']
    sigma_omega = params_DGP['sigma_omega']

    rho_perceived = params_expectations['rho_perceived']
    sigma_eps_perceived = params_expectations['sigma_eps_perceived']
    sigma_omega_perceived = params_expectations['sigma_omega_perceived']
    theta = params_expectations['theta']

    # Check whether forecast errors for the
    # model follow the prediction
    G = get_kalman_gain(rho_perceived, sigma_eps_perceived, sigma_omega_perceived)
    test_1 = df['forecast_error'] - (rho_perceived * (1 - G) + rho) * df['forecast_error_lag1'] + rho_perceived * rho * (1 - G) * df['forecast_error_lag2']
    test_2 = df['eps'] - rho_perceived * (1 + theta * G) * df['eps_lag1'] + rho_perceived ** 2 * theta * G * df['eps_lag2']
    test_2 += -rho_perceived * G * (1 + theta) * (df['omega_lag1'] - (rho + theta * (rho_perceived + rho)) / (1 + theta) * df['omega_lag2'] + theta * rho * rho_perceived / (1 + theta) * df['omega_lag3'])
    assert_allclose(test_1.values[1:], test_2.values[1:], atol = 1e-8)


def test_bias_coefs_aggregate():
    # Check whether theoretical formulas for
    # shock-specific bias coefficients match
    # numerically obtained bias coefficients

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.5,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.75,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': 5.0,
                           'theta': 0.50}

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Get theoretical values of bias coefficients
    max_L = 100
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    
    # Estimate bias coefficients numerically
    for var in ['eps', 'omega']:
        for ll in range(1, max_L + 1):
            df['{}_LAG{}'.format(var, ll)] = df[var].shift(ll)

    formula = 'forecast_error ~ eps'
    for ll in range(1, max_L + 1):
        formula += ' + eps_LAG{}'.format(ll)
        formula += ' + omega_LAG{}'.format(ll)

    results = smf.ols(formula, data = df).fit()

    IRF_eps   = []
    IRF_omega = []
    for ll in range(1, max_L + 1):
        IRF_eps.append(results.params['eps_LAG{}'.format(ll)])
        IRF_omega.append(results.params['omega_LAG{}'.format(ll)])

    assert_allclose(-np.array(IRF_eps), bias_coefs['aggregate'], atol = 1e-8)


def test_bias_coefs_aggregate_diagnostic():
    # Check whether theoretical formulas for
    # shock-specific bias coefficients match
    # those for special cases

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.5,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.75}

    # Get theoretical values of bias coefficients
    max_L = 100
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    
    # Calculate using a closed-form solution
    L = np.array(range(1, max_L + 1))
    rho = params_DGP['rho']
    theta = params_expectations['theta']
    G = get_kalman_gain(rho = params_expectations['rho_perceived'], 
        sigma_eps = params_expectations['sigma_eps_perceived'], 
        sigma_omega = params_expectations['sigma_omega_perceived'])
    test = -rho ** L * (1 - G) ** L + theta * G * rho ** L * (1 - G) ** (L - 1)

    assert_allclose(bias_coefs['aggregate'], test, atol = 1e-8)


def test_bias_coefs_idiosyncratic():
    # Check whether theoretical formulas for
    # shock-specific bias coefficients match
    # numerically obtained bias coefficients

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.5,
                  'T': 1000 + 5000}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.75,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': 5.0,
                           'theta': 0.50}

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)

    # Remove initial observations as a "burn-in period"
    df = df.iloc[1000:, ]

    # Get theoretical values of bias coefficients
    max_L = 100
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    
    # Estimate bias coefficients numerically
    for var in ['eps', 'omega']:
        for ll in range(1, max_L + 1):
            df['{}_LAG{}'.format(var, ll)] = df[var].shift(ll)

    formula = 'forecast_error ~ eps'
    for ll in range(1, max_L + 1):
        formula += ' + eps_LAG{}'.format(ll)
        formula += ' + omega_LAG{}'.format(ll)

    results = smf.ols(formula, data = df).fit()

    IRF_eps   = []
    IRF_omega = []
    for ll in range(1, max_L + 1):
        IRF_eps.append(results.params['eps_LAG{}'.format(ll)])
        IRF_omega.append(results.params['omega_LAG{}'.format(ll)])

    assert_allclose(-np.array(IRF_omega), bias_coefs['idiosyncratic'], atol = 1e-8)


def test_autocov():
    theta = np.array([1, 0.5, -0.75, 0.25])
    sigma = 0.85
    autocovar = get_autocovar_MA(theta, sigma)

    # Calculate using statsmodels as a test
    test = statsmodels.tsa.arima_process.arma_acovf(ar = np.array([1.0]), 
        ma = theta, 
        nobs = len(theta), 
        sigma2 = sigma ** 2)

    assert_allclose(autocovar, test)


def test_autocov_2():
    theta = np.array([1, -0.75, 1.10])
    sigma = 1.5
    autocovar = get_autocovar_MA(theta, sigma)

    # Calculate using statsmodels as a test
    test = statsmodels.tsa.arima_process.arma_acovf(ar = np.array([1.0]), 
        ma = theta, 
        nobs = len(theta), 
        sigma2 = sigma ** 2)

    assert_allclose(autocovar, test)


def test_MA_sum():
    # Test of representation of sum of
    # two independent MA processes
    theta_1 = np.array([1, 0.75, 0.35])
    sigma_1 = 1.5

    theta_2 = np.array([1, -0.25, 0.15])
    sigma_2 = 0.5

    # Calculate MA representation numerically
    res = get_MA_coefs_sum(theta_1, sigma_1, theta_2, sigma_2)
    
    # Calculate autocovariances
    autocov_1 = get_autocovar_MA(theta_1, sigma_1)
    autocov_2 = get_autocovar_MA(theta_2, sigma_2)
    autocov = autocov_1 + autocov_2

    test = get_autocovar_MA(res['theta'], res['sigma'])
    assert_allclose(autocov, test)


def test_MA_sum_2():
    # Test of representation of sum of
    # two independent MA processes
    theta_1 = np.array([1, 0.75, 0.35])
    sigma_1 = 1.5

    theta_2 = theta_1
    sigma_2 = 0.5

    # Calculate MA representation numerically
    res = get_MA_coefs_sum(theta_1, sigma_1, theta_2, sigma_2)

    assert_allclose(res['theta'], theta_1)


def test_MA_sum_3():
    # Test of representation of sum of
    # two independent MA processes
    theta_1 = np.array([1, 0.75, 0.35])
    sigma_1 = 1.5

    theta_2 = theta_1
    sigma_2 = 0.5

    # Calculate MA representation numerically
    res = get_MA_coefs_sum(theta_1, sigma_1, theta_2, sigma_2)

    assert_allclose(res['sigma'], np.sqrt(sigma_1 ** 2 + sigma_2 ** 2))


def test_MA_sum_4():
    # Test of the represenation of two MA(1) processes
    # using a closed-form solution for two MA(1) processes
    # In this example
    #   x_t = w_t + z_t
    # where w_t and z_t are independent
    # invertible MA(1) processes with
    
    #   w_t = u_t + alpha * u_{t - 1}, u_t ~ IID (0, sU ** 2)
    #   z_t = v_t + beta  * v_{t - 1}, v_t ~ IID (0, sV ** 2)
    
    # Set parameters
    alpha = 0.30
    sU = 1.5
    
    beta = 0.75
    sV = 0.5

    # Collect to form necessary for functions    
    theta_1 = np.array([1, alpha])
    sigma_1 = sU

    theta_2 = np.array([1, beta])
    sigma_2 = sV

    # Calculate MA representation numerically
    res = get_MA_coefs_sum(theta_1, sigma_1, theta_2, sigma_2)
    
    sqrt_term = np.sqrt(((1 + alpha ** 2) * sU ** 2 + (1 + beta ** 2) * sV ** 2) ** 2 
                         - 4 * (alpha * sU ** 2 + beta * sV ** 2) ** 2)
    sE = np.sqrt((sU ** 2 * (1 + alpha ** 2) + sV ** 2 * (1 + beta ** 2) + sqrt_term) / 2.0)
    theta = ((sU ** 2 * (1 + alpha ** 2) + sV ** 2 * (1 + beta ** 2) - sqrt_term) 
             / (2.0 * (alpha * sU ** 2 + beta * sV ** 2)))

    assert_allclose(res['theta'], np.insert(np.array([theta]), 0, 1.0))


def test_MA_sum_5():
    # Test of the represenation of two MA(1) processes
    # using a closed-form solution for two MA(1) processes
    # In this example
    #   x_t = w_t + z_t
    # where w_t and z_t are independent
    # invertible MA(1) processes with
    
    #   w_t = u_t + alpha * u_{t - 1}, u_t ~ IID (0, sU ** 2)
    #   z_t = v_t + beta  * v_{t - 1}, v_t ~ IID (0, sV ** 2)
    
    # Set parameters
    alpha = 0.30
    sU = 1.5
    
    beta = 0.75
    sV = 0.5

    # Collect to form necessary for functions    
    theta_1 = np.array([1, alpha])
    sigma_1 = sU

    theta_2 = np.array([1, beta])
    sigma_2 = sV

    # Calculate MA representation numerically
    res = get_MA_coefs_sum(theta_1, sigma_1, theta_2, sigma_2)
    
    sqrt_term = np.sqrt(((1 + alpha ** 2) * sU ** 2 + (1 + beta ** 2) * sV ** 2) ** 2 
                         - 4 * (alpha * sU ** 2 + beta * sV ** 2) ** 2)
    sE = np.sqrt((sU ** 2 * (1 + alpha ** 2) + sV ** 2 * (1 + beta ** 2) + sqrt_term) / 2.0)
    theta = ((sU ** 2 * (1 + alpha ** 2) + sV ** 2 * (1 + beta ** 2) - sqrt_term) 
             / (2.0 * (alpha * sU ** 2 + beta * sV ** 2)))

    assert_allclose(res['sigma'], sE)


def test_MA_sum_6():
    # Test of the represenation of two MA(1) processes
    # using an example from Hamilton (1994, p. 102--105)
    delta = 0.30
    sigma_u = 1.5
    sigma_v = 0.5

    # Collect to form necessary for functions    
    theta_1 = np.array([1, delta])
    sigma_1 = sigma_u

    theta_2 = np.array([1, 0.0])
    sigma_2 = sigma_v

    # Calculate MA representation numerically
    res = get_MA_coefs_sum(theta_1, sigma_1, theta_2, sigma_2)
    
    # Closed-form solution from Hamilton
    term = (1 + delta ** 2) + (sigma_v ** 2 / sigma_u ** 2)
    theta = (term - np.sqrt(term ** 2 - 4 * delta ** 2)) / (2 * delta)
    sigma = sigma_1 * np.sqrt(delta / theta)
    test = np.array([1, theta])

    assert_allclose(res['theta'], test)


def test_IRF_ARMA22():
    phi_1 = 0.75
    phi_2 = -0.30
    theta_1 = -0.8
    theta_2 = 0.4
    L = 15
    res = get_IRF_ARMA22(phi_1, phi_2, theta_1, theta_2, L)

    # Calculate the IRF using statsmodels
    test = statsmodels.tsa.arima_process.arma_impulse_response(ar = np.array([1, -phi_1, -phi_2]),
        ma = np.array([1, theta_1, theta_2]),
        leads = L)

    assert_allclose(res, test)


def test_IRF_ARMA22_2():
    phi_1 = 0.8
    phi_2 = 0.0
    theta_1 = 0.0
    theta_2 = 0.0
    L = 15
    res = get_IRF_ARMA22(phi_1, phi_2, theta_1, theta_2, L)

    # Calculate the IRF by hand
    test = phi_1 ** np.array(range(L))

    assert_allclose(res, test)


def test_IRF_ARMA22_3():
    phi_1 = 0.8
    phi_2 = 0.0
    theta_1 = -0.4
    theta_2 = 0.0
    L = 15
    res = get_IRF_ARMA22(phi_1, phi_2, theta_1, theta_2, L)

    # Calculate the IRF by hand
    test = np.zeros(L)
    test[0] = 1
    test[1] = phi_1 + theta_1
    for ll in range(2, L):
        test[ll] = test[ll - 1] * phi_1

    assert_allclose(res, test)


def test_composite_bias_coefs():
    # Test composite bias coefficients
    # against a case with a known closed-form
    # solution

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.5,
                  'T': 1000 + 5000}
    L = 12

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'] * 0.75,
                           'theta': 0.00}

    # Calculate bias coefficients numerically
    bias_coefs = calculate_composite_bias_coefs(params_DGP, params_expectations, L)

    # Closed-form solution
    G = get_kalman_gain(params_expectations['rho_perceived'], 
        params_expectations['sigma_eps_perceived'], 
        params_expectations['sigma_omega_perceived'])
    test = (params_DGP['rho'] * (1 - G)) ** np.array(range(L + 1))
    test = -test[1:]

    assert_allclose(bias_coefs, test)


def test_composite_bias_coefs_2():
    # Test composite bias coefficients
    # against a case with a known closed-form
    # solution

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-10,
                  'T': 1000 + 5000}
    L = 12

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.95,
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.00}

    # Calculate bias coefficients numerically
    bias_coefs = calculate_composite_bias_coefs(params_DGP, params_expectations, L)

    # Closed-form solution
    L_grid = np.array(range(1, L + 1))
    test = params_DGP['rho'] ** (L_grid - 1) * (params_expectations['rho_perceived'] - params_DGP['rho'])
    
    assert_allclose(bias_coefs, test)


def test_composite_bias_coefs_3():
    # Test composite bias coefficients
    # against shock-specific bias coefficients
    # when there are no idiosyncratic shocks

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-10,
                  'T': 1000 + 5000}
    L = 12

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.95,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.50}

    # Calculate bias coefficients numerically
    bias_coefs = calculate_bias_coefs(params_DGP, params_expectations, L)  
    assert_allclose(bias_coefs['composite'], bias_coefs['aggregate'])


def test_composite_bias_coefs_4():
    # Test composite bias coefficients
    # against shock-specific bias coefficients
    # when there are no aggregate shocks

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1e-10,
                  'sigma_omega': 3.0,
                  'T': 1000 + 5000}
    L = 12

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.95,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.50}

    # Calculate bias coefficients numerically
    bias_coefs = calculate_bias_coefs(params_DGP, params_expectations, L)  
    assert_allclose(bias_coefs['composite'][0:(-1)], 
                    -bias_coefs['idiosyncratic'][1:] / bias_coefs['idiosyncratic'][0])


def test_composite_bias_coefs_5():
    
    # Test composite bias coefficients
    # against shock-specific bias coefficients
    # by calculating the implied variances
    # of forecast errors

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.0,
                  'T': 1000 + 5000}
    L = 50

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.95,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': 4.5,
                           'theta': 0.65}

    # Calculate variance of forecast errors
    # using the calculated composite bias coefficients
    bias_coefs = calculate_composite_bias_coefs(params_DGP, params_expectations, L, return_sigma = True)
    var_composite = bias_coefs['sigma'] ** 2 * (np.sum(bias_coefs['bias_coefficients'] ** 2) + 1)

    # Calculate variance of forecast errors
    # using the calculated shock-specific bias coefficients
    shock_specific = calculate_bias_coefs(params_DGP, params_expectations, L)
    test = (np.sum(shock_specific['aggregate'] ** 2) + 1) * params_DGP['sigma_eps'] ** 2 + np.sum(shock_specific['idiosyncratic'] ** 2) * params_DGP['sigma_omega'] ** 2
    
    assert_allclose(var_composite, test)


def test_composite_bias_coefs_6():
    
    # Test composite bias coefficients
    # by calculating variance of forecast
    # errors in a case with a known closed-form
    # solution

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-10,
                  'T': 1000 + 5000}
    L = 50

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.95,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.00}

    # Calculate variance of forecast errors
    # using the calculated composite bias coefficients
    bias_coefs = calculate_composite_bias_coefs(params_DGP, params_expectations, L, return_sigma = True)
    var_composite = bias_coefs['sigma'] ** 2 * (np.sum(bias_coefs['bias_coefficients'] ** 2) + 1)

    # Calculate variance of forecast errors
    # using a closed-form solution
    sigma_eps = params_DGP['sigma_eps']
    rho = params_DGP['rho']
    rho_perceived = params_expectations['rho_perceived']
    test = sigma_eps ** 2 * (1 - 2 * rho * rho_perceived + rho_perceived ** 2) / (1 - rho ** 2)
    
    assert_allclose(var_composite, test)


def test_composite_bias_coefs_7():
    
    # Test composite bias coefficients
    # by calculating variance of forecast
    # errors in a case with a known closed-form
    # solution

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-10,
                  'T': 1000 + 5000}
    L = 50

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.95,
                           'sigma_eps_perceived': 1.25,
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.00}

    # Calculate variance of forecast errors
    # using the calculated composite bias coefficients
    bias_coefs = calculate_composite_bias_coefs(params_DGP, params_expectations, L, return_sigma = True)
    var_composite = bias_coefs['sigma'] ** 2 * (np.sum(bias_coefs['bias_coefficients'] ** 2) + 1)

    # Calculate variance of forecast errors
    # using a closed-form solution
    sigma_eps = params_DGP['sigma_eps']
    rho = params_DGP['rho']
    rho_perceived = params_expectations['rho_perceived']
    test = sigma_eps ** 2 * (1 - 2 * rho * rho_perceived + rho_perceived ** 2) / (1 - rho ** 2)
    
    assert_allclose(var_composite, test)


def test_composite_bias_coefs_8():
    
    # Test composite bias coefficients
    # by calculating variance of forecast
    # errors in a case with a known closed-form
    # solution

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-10,
                  'T': 1000 + 5000}
    L = 50

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.70}

    # Calculate variance of forecast errors
    # using the calculated composite bias coefficients
    bias_coefs = calculate_composite_bias_coefs(params_DGP, params_expectations, L, return_sigma = True)
    var_composite = bias_coefs['sigma'] ** 2 * (np.sum(bias_coefs['bias_coefficients'] ** 2) + 1)

    # Calculate variance of forecast errors
    # using a closed-form solution
    sigma_eps = params_DGP['sigma_eps']
    rho = params_DGP['rho']
    theta = params_expectations['theta']
    test = sigma_eps ** 2 * (1 + rho ** 2 * theta ** 2)
    
    assert_allclose(var_composite, test)


def test_composite_bias_coefs_9():

    np.random.seed(20182018)
    
    # Test whether composite bias coefficients
    # are "in between" IRFs w.r.t. specific shocks

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.0,
                  'T': 1000 + 5000}
    L = 50

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.9,
                           'sigma_eps_perceived': 2.0,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.70}

    # Calculate variance of forecast errors
    # using the calculated composite bias coefficients
    bias_coefs = calculate_bias_coefs(params_DGP, params_expectations, L)

    # Estimate IRFs w.r.t. shocks numerically

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)
    
    # Estimate IRFs via regressions on shocks
    for var in ['eps', 'omega']:
        for ll in range(1, L + 1):
            df['{}_LAG{}'.format(var, ll)] = df[var].shift(ll)

    formula = 'forecast_error ~ eps'
    for ll in range(1, L + 1):
        formula += ' + eps_LAG{}'.format(ll)
        formula += ' + omega_LAG{}'.format(ll)

    results = smf.ols(formula, data = df).fit()

    # Collect results
    IRF_eps   = []
    IRF_omega = []
    for ll in range(1, L + 1):
        IRF_eps.append(results.params['eps_LAG{}'.format(ll)])
        IRF_omega.append(results.params['omega_LAG{}'.format(ll)])
    IRF_eps = np.array(IRF_eps)
    IRF_eps = np.insert(IRF_eps, 0, 1.0)
    IRF_eps = IRF_eps[0:(-1)]
    IRF_omega = np.array(IRF_omega)
    IRF_omega = IRF_omega / IRF_omega[0] # Normalize

    IRF_composite = -bias_coefs['composite'].values 
    IRF_composite = np.insert(IRF_composite, 0, 1.0)
    IRF_composite = IRF_composite[0:(-1)]

    assert (IRF_eps <= IRF_composite).all() and (IRF_composite <= IRF_omega).all()


def test_composite_bias_coefs_10():

    np.random.seed(20182018)
    
    # Test whether composite bias coefficients
    # are "in between" IRFs w.r.t. specific shocks

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 3.0,
                  'T': 1000 + 5000}
    L = 30

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.70}

    # Calculate variance of forecast errors
    # using the calculated composite bias coefficients
    bias_coefs = calculate_bias_coefs(params_DGP, params_expectations, L)

    # Estimate IRFs w.r.t. shocks numerically

    # Generate data
    df = simulate_dataset(params_DGP, params_expectations)
    
    # Estimate IRFs via regressions on shocks
    for var in ['eps', 'omega']:
        for ll in range(1, L + 1):
            df['{}_LAG{}'.format(var, ll)] = df[var].shift(ll)

    formula = 'forecast_error ~ eps'
    for ll in range(1, L + 1):
        formula += ' + eps_LAG{}'.format(ll)
        formula += ' + omega_LAG{}'.format(ll)

    results = smf.ols(formula, data = df).fit()

    # Collect results
    IRF_eps   = []
    IRF_omega = []
    for ll in range(1, L + 1):
        IRF_eps.append(results.params['eps_LAG{}'.format(ll)])
        IRF_omega.append(results.params['omega_LAG{}'.format(ll)])
    IRF_eps = np.array(IRF_eps)
    IRF_eps = np.insert(IRF_eps, 0, 1.0)
    IRF_eps = IRF_eps[0:(-1)]
    IRF_omega = np.array(IRF_omega)
    IRF_omega = IRF_omega / IRF_omega[0] # Normalize

    IRF_composite = -bias_coefs['composite'].values 
    IRF_composite = np.insert(IRF_composite, 0, 1.0)
    IRF_composite = IRF_composite[0:(-1)]

    assert (IRF_omega <= IRF_composite).all() and (IRF_composite <= IRF_eps).all()


def test_composite_measurement_error():

    # Test whether composite bias coefficients
    # in case of measurement error coincide with
    # analytical expressions in a special case

    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'sigma_eps': 1.5,
                  'sigma_omega': 1e-8,
                  'T': 1000 + 5000}
    L = 30
    sigma_m = 1.5

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': params_DGP['sigma_eps'],
                           'sigma_omega_perceived': params_DGP['sigma_omega'],
                           'theta': 0.70}

    # Calculate composite bias coefficients numerically
    bias_coefs = calculate_composite_bias_coefs_noisy(params_DGP, params_expectations, L, sigma_m)

    # Calculate composite bias coefficients using an
    # analytical formula
    sigma_eps = params_DGP['sigma_eps']
    theta = params_expectations['theta']
    rho = params_DGP['rho']
    
    term = sigma_m ** 2 / (sigma_eps ** 2) + (1 + (theta * rho) ** 2)
    psi = (term - np.sqrt(term ** 2 - 4 * (theta * rho) ** 2)) / (-2 * theta * rho)
    test = np.zeros(L)
    test[0] = -psi

    assert_allclose(bias_coefs, test, atol = 1e-8)


def test_calibration_noisy():
    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'T': np.nan}

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': 1.0,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.00}

    # Get "empirical" values of bias coefficients
    max_L = 12 + 1
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    df_empirical = pd.DataFrame()
    df_empirical['aggregate_empirical'] = bias_coefs['aggregate']
    df_empirical['idiosyncratic_empirical'] = bias_coefs['idiosyncratic']
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'] / df_empirical['idiosyncratic_empirical'][0]
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'].shift(-1)
    df_empirical = df_empirical.iloc[0:(-1), ]
    df_empirical['L'] = range(1, len(df_empirical['aggregate_empirical']) + 1)

    # Get empirical fit
    res_baseline_noisy = fit_model(df_empirical = df_empirical, 
                                   params_DGP = params_DGP, 
                                   model = 'baseline_noisy')

    # Compare Kalman gains
    G_true = get_kalman_gain(rho = params_expectations['rho_perceived'], 
        sigma_eps = params_expectations['sigma_eps_perceived'], 
        sigma_omega = params_expectations['sigma_omega_perceived'])
    assert_allclose(G_true, res_baseline_noisy['G'], atol = 1e-5)


def test_calibration_misperceived():
    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'T': np.nan}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.6,
                           'sigma_eps_perceived': 1.0,
                           'sigma_omega_perceived': 1e-6,
                           'theta': 0.00}

    # Get "empirical" values of bias coefficients
    max_L = 12 + 1
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    df_empirical = pd.DataFrame()
    df_empirical['aggregate_empirical'] = bias_coefs['aggregate']
    df_empirical['idiosyncratic_empirical'] = bias_coefs['idiosyncratic']
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'] / df_empirical['idiosyncratic_empirical'][0]
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'].shift(-1)
    df_empirical = df_empirical.iloc[0:(-1), ]
    df_empirical['L'] = range(1, len(df_empirical['aggregate_empirical']) + 1)

    # Get empirical fit
    res_baseline_noisy = fit_model(df_empirical = df_empirical, 
                                   params_DGP = params_DGP, 
                                   model = 'misperception')

    # Compare perceived persistence
    assert_allclose(params_expectations['rho_perceived'], res_baseline_noisy['rho_perceived'], atol = 1e-5)


def test_calibration_noisy_misperceived_1():
    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'T': np.nan}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.6,
                           'sigma_eps_perceived': 1.0,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.00}

    # Get "empirical" values of bias coefficients
    max_L = 12 + 1
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    df_empirical = pd.DataFrame()
    df_empirical['aggregate_empirical'] = bias_coefs['aggregate']
    df_empirical['idiosyncratic_empirical'] = bias_coefs['idiosyncratic']
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'] / df_empirical['idiosyncratic_empirical'][0]
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'].shift(-1)
    df_empirical = df_empirical.iloc[0:(-1), ]
    df_empirical['L'] = range(1, len(df_empirical['aggregate_empirical']) + 1)

    # Get empirical fit
    res_baseline_noisy = fit_model(df_empirical = df_empirical, 
                                   params_DGP = params_DGP, 
                                   model = 'noisy_misperception')

    # Compare perceived persistence
    assert_allclose(params_expectations['rho_perceived'], res_baseline_noisy['rho_perceived'], atol = 1e-5)


def test_calibration_noisy_misperceived_2():
    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'T': np.nan}

    # Expectations parameters
    params_expectations = {'rho_perceived': 0.6,
                           'sigma_eps_perceived': 1.0,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.00}

    # Get "empirical" values of bias coefficients
    max_L = 12 + 1
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    df_empirical = pd.DataFrame()
    df_empirical['aggregate_empirical'] = bias_coefs['aggregate']
    df_empirical['idiosyncratic_empirical'] = bias_coefs['idiosyncratic']
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'] / df_empirical['idiosyncratic_empirical'][0]
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'].shift(-1)
    df_empirical = df_empirical.iloc[0:(-1), ]
    df_empirical['L'] = range(1, len(df_empirical['aggregate_empirical']) + 1)

    # Get empirical fit
    res_baseline_noisy = fit_model(df_empirical = df_empirical, 
                                   params_DGP = params_DGP, 
                                   model = 'noisy_misperception')

    # Compare Kalman gains
    G_true = get_kalman_gain(rho = params_expectations['rho_perceived'], 
        sigma_eps = params_expectations['sigma_eps_perceived'], 
        sigma_omega = params_expectations['sigma_omega_perceived'])
    assert_allclose(G_true, res_baseline_noisy['G'], atol = 1e-5)


def test_calibration_noisy_diagnostic_1():
    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'T': np.nan}

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': 1.0,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.80}

    # Get "empirical" values of bias coefficients
    max_L = 12 + 1
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    df_empirical = pd.DataFrame()
    df_empirical['aggregate_empirical'] = bias_coefs['aggregate']
    df_empirical['idiosyncratic_empirical'] = bias_coefs['idiosyncratic']
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'] / df_empirical['idiosyncratic_empirical'][0]
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'].shift(-1)
    df_empirical = df_empirical.iloc[0:(-1), ]
    df_empirical['L'] = range(1, len(df_empirical['aggregate_empirical']) + 1)

    # Get empirical fit
    res_baseline_noisy = fit_model(df_empirical = df_empirical, 
                                   params_DGP = params_DGP, 
                                   model = 'noisy_diagnostic')

    # Compare perceived persistence
    assert_allclose(params_expectations['theta'], res_baseline_noisy['theta'], atol = 1e-5)


def test_calibration_noisy_diagnostic_2():
    # Parameters of DGP
    params_DGP = {'rho': 0.8,
                  'T': np.nan}

    # Expectations parameters
    params_expectations = {'rho_perceived': params_DGP['rho'],
                           'sigma_eps_perceived': 1.0,
                           'sigma_omega_perceived': 2.5,
                           'theta': 0.80}

    # Get "empirical" values of bias coefficients
    max_L = 12 + 1
    bias_coefs = calculate_shock_specific_bias_coefs(params_DGP, params_expectations, max_L)
    df_empirical = pd.DataFrame()
    df_empirical['aggregate_empirical'] = bias_coefs['aggregate']
    df_empirical['idiosyncratic_empirical'] = bias_coefs['idiosyncratic']
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'] / df_empirical['idiosyncratic_empirical'][0]
    df_empirical['idiosyncratic_empirical'] = df_empirical['idiosyncratic_empirical'].shift(-1)
    df_empirical = df_empirical.iloc[0:(-1), ]
    df_empirical['L'] = range(1, len(df_empirical['aggregate_empirical']) + 1)

    # Get empirical fit
    res_baseline_noisy = fit_model(df_empirical = df_empirical, 
                                   params_DGP = params_DGP, 
                                   model = 'noisy_diagnostic')

    # Compare Kalman gains
    G_true = get_kalman_gain(rho = params_expectations['rho_perceived'], 
        sigma_eps = params_expectations['sigma_eps_perceived'], 
        sigma_omega = params_expectations['sigma_omega_perceived'])
    assert_allclose(G_true, res_baseline_noisy['G'], atol = 1e-5)